blob: 177c0e68052c6be401ebd8fbb989013794250f6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mrunit.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
public class TestWordCount {
private static final String FILE01 = "Hello World Bye World";
private static final String FILE02 = "Hello Hadoop Goodbye Hadoop";
private static final int ONE = 1;
private static final int TWO = 2;
private Mapper<LongWritable, Text, Text, IntWritable> mapper;
private Reducer<Text, IntWritable, Text, IntWritable> reducer;
private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
private List<Pair<Text, IntWritable>> expectedOutput;
@Before
public void setup() {
mapper = new Map();
reducer = new Reduce();
driver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
expectedOutput = new ArrayList<Pair<Text, IntWritable>>();
expectedOutput.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
expectedOutput.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(TWO)));
expectedOutput.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(TWO)));
expectedOutput.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(TWO)));
}
@Test
public void TestMapDriver() throws IOException {
final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
final List<Pair<Text, IntWritable>> outputs = new ArrayList<Pair<Text, IntWritable>>();
outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("Bye"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("World"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("Hello"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("Goodbye"), new IntWritable(ONE)));
outputs.add(new Pair<Text, IntWritable>(new Text("Hadoop"), new IntWritable(ONE)));
mapDriver.withAll(inputs).withAllOutput(outputs).runTest(true);
}
@Test
public void TestReduceDriver() throws IOException {
final List<IntWritable> input1 = new ArrayList<IntWritable>();
input1.add(new IntWritable(ONE));
final List<IntWritable> input2 = new ArrayList<IntWritable>();
input2.add(new IntWritable(ONE));
input2.add(new IntWritable(ONE));
final List<Pair<Text, List<IntWritable>>> inputs = new ArrayList<Pair<Text, List<IntWritable>>>();
inputs.add(new Pair<Text, List<IntWritable>>(new Text("Bye"), input1));
inputs.add(new Pair<Text, List<IntWritable>>(new Text("Goodbye"), input1));
inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hadoop"), input2));
inputs.add(new Pair<Text, List<IntWritable>>(new Text("Hello"), input2));
inputs.add(new Pair<Text, List<IntWritable>>(new Text("World"), input2));
reduceDriver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
}
@Test
public void TestRun() throws IOException {
final List<Pair<LongWritable, Text>> inputs = new ArrayList<Pair<LongWritable, Text>>();
inputs.add(new Pair<LongWritable, Text>(new LongWritable(21), new Text(FILE01)));
inputs.add(new Pair<LongWritable, Text>(new LongWritable(48), new Text(FILE02)));
driver.withAll(inputs).withAllOutput(expectedOutput).runTest(true);
}
/**
* Word count mapper
*/
public class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
/**
* Word count reducer
*/
public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
}