/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mrunit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.mrunit.types.TestWritable;
import org.apache.hadoop.mrunit.types.UncomparableWritable;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import static org.apache.hadoop.mrunit.ExtendedAssert.assertListEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class TestMultipleInputsMapReduceDriver {
  @Rule
  public final ExpectedSuppliedException thrown = ExpectedSuppliedException
      .none();

  private static final int FOO_IN_A = 42;
  private static final int FOO_IN_B = 10;
  private static final int TOKEN_IN_A = 1;
  private static final int TOKEN_IN_B = 2;
  private static final int BAR_IN = 12;
  private static final int BAR_OUT = BAR_IN + TOKEN_IN_A + TOKEN_IN_B;
  private static final int FOO_OUT = FOO_IN_A + FOO_IN_B + TOKEN_IN_A + 2
      * TOKEN_IN_B;
  private static final String TOKEN_A = "foo bar";
  private static final String TOKEN_B = "foo foo bar";

  private Mapper<Text, LongWritable, Text, LongWritable> mapper;
  private Reducer<Text, LongWritable, Text, LongWritable> reducer;
  private TokenMapper tokenMapper;
  private MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable> driver;

  @Before
  public void setUp() {
    mapper = new IdentityMapper<Text, LongWritable>();
    reducer = new LongSumReducer<Text>();
    tokenMapper = new TokenMapper();
    driver = new MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable>(
        reducer);
    driver.addMapper(mapper);
    driver.addMapper(tokenMapper);
  }

  @Test
  public void testRun() throws IOException {
    final List<Pair<Text, LongWritable>> out = driver
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .run();

    final List<Pair<Text, LongWritable>> expected = new ArrayList<Pair<Text, LongWritable>>();
    expected.add(new Pair<Text, LongWritable>(new Text("bar"),
        new LongWritable(BAR_OUT)));
    expected.add(new Pair<Text, LongWritable>(new Text("foo"),
        new LongWritable(FOO_OUT)));

    assertListEquals(expected, out);
  }

  @Test
  public void testUncomparable() throws IOException {
    MultipleInputsMapReduceDriver<Text, Object, Text, Object> testDriver = MultipleInputsMapReduceDriver
        .newMultipleInputMapReduceDriver(new IdentityReducer<Text, Object>());

    Mapper<Text, Object, Text, Object> identity = new IdentityMapper<Text, Object>();
    testDriver.addMapper(identity);
    Text k1 = new Text("foo");
    Object v1 = new UncomparableWritable(1);
    testDriver.withInput(identity, k1, v1);

    ReverseIdentityMapper<Object, Text> reverse = new ReverseIdentityMapper<Object, Text>();
    testDriver.addMapper(reverse);
    Text k2 = new Text("bar");
    Object v2 = new UncomparableWritable(2);
    testDriver.withInput(reverse, v2, k2);

    testDriver.withOutput(k1, v1).withOutput(k2, v2);

    testDriver.runTest(false);
  }

  @Test
  public void testTestRun() throws IOException {
    driver
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .withOutput(new Text("bar"), new LongWritable(BAR_OUT))
        .withOutput(new Text("foo"), new LongWritable(FOO_OUT)).runTest(false);
  }

  @Test
  public void testAddAll() throws IOException {
    final List<Pair<Text, LongWritable>> mapperInputs = new ArrayList<Pair<Text, LongWritable>>();
    mapperInputs.add(new Pair<Text, LongWritable>(new Text("foo"),
        new LongWritable(FOO_IN_A)));
    mapperInputs.add(new Pair<Text, LongWritable>(new Text("foo"),
        new LongWritable(FOO_IN_B)));
    mapperInputs.add(new Pair<Text, LongWritable>(new Text("bar"),
        new LongWritable(BAR_IN)));

    final List<Pair<LongWritable, Text>> tokenMapperInputs = new ArrayList<Pair<LongWritable, Text>>();
    tokenMapperInputs.add(new Pair<LongWritable, Text>(new LongWritable(
        TOKEN_IN_A), new Text(TOKEN_A)));
    tokenMapperInputs.add(new Pair<LongWritable, Text>(new LongWritable(
        TOKEN_IN_B), new Text(TOKEN_B)));

    final List<Pair<Text, LongWritable>> outputs = new ArrayList<Pair<Text, LongWritable>>();
    outputs.add(new Pair<Text, LongWritable>(new Text("bar"), new LongWritable(
        BAR_OUT)));
    outputs.add(new Pair<Text, LongWritable>(new Text("foo"), new LongWritable(
        FOO_OUT)));

    driver.withAll(mapper, mapperInputs)
        .withAll(tokenMapper, tokenMapperInputs).withAllOutput(outputs)
        .runTest(false);
  }

  @Test
  public void testNoInput() throws IOException {
    thrown.expectMessage(IllegalStateException.class,
        "No input was provided for mapper");
    driver.runTest(false);
  }

  @Test
  public void testNoInputForMapper() throws IOException {
    MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable> testDriver = new MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable>();
    testDriver.addMapper(mapper);
    testDriver.addMapper(tokenMapper);
    testDriver.withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A));
    thrown.expectMessage(IllegalStateException.class,
        String.format("No input was provided for mapper %s", tokenMapper));
    testDriver.runTest(false);
  }

  @Test
  public void testNoReducer() throws IOException {
    MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable> testDriver = new MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable>();
    testDriver.addMapper(mapper);
    testDriver.withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A));
    thrown.expectMessage(IllegalStateException.class,
        "No reducer class was provided");
    testDriver.runTest(false);
  }

  @Test
  public void testIdentityCombiner() throws IOException {
    driver
        .withCombiner(new IdentityReducer<Text, LongWritable>())
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
        .withOutput(new Text("bar"), new LongWritable(BAR_OUT)).runTest(false);
  }

  @Test
  public void testLongSumCombiner() throws IOException {
    driver
        .withCombiner(new LongSumReducer<Text>())
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
        .withOutput(new Text("bar"), new LongWritable(BAR_OUT)).runTest(false);
  }

  @Test
  public void testLongSumCombinerAndIdentityReducer() throws IOException {
    driver
        .withCombiner(new LongSumReducer<Text>())
        .withReducer(new IdentityReducer<Text, LongWritable>())
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
        .withOutput(new Text("bar"), new LongWritable(BAR_OUT)).runTest(false);
  }

  @Test
  public void testRepeatRun() throws IOException {
    driver
        .withCombiner(new IdentityReducer<Text, LongWritable>())
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_A))
        .withInput(mapper, new Text("foo"), new LongWritable(FOO_IN_B))
        .withInput(mapper, new Text("bar"), new LongWritable(BAR_IN))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_A), new Text(TOKEN_A))
        .withInput(tokenMapper, new LongWritable(TOKEN_IN_B), new Text(TOKEN_B))
        .withOutput(new Text("foo"), new LongWritable(FOO_OUT))
        .withOutput(new Text("bar"), new LongWritable(BAR_OUT)).runTest(false);
    thrown.expectMessage(IllegalStateException.class,
        "Driver reuse not allowed");
    driver.runTest(false);
  }

  // Test the key grouping and value ordering comparators
  @Test
  public void testComparators() throws IOException {
    // reducer to track the order of the input values using bit shifting
    driver.withReducer(new Reducer<Text, LongWritable, Text, LongWritable>() {
      @Override
      public void reduce(final Text key, final Iterator<LongWritable> values,
          final OutputCollector<Text, LongWritable> output,
          final Reporter reporter) throws IOException {
        long outputValue = 0;
        int count = 0;
        while (values.hasNext()) {
          outputValue |= (values.next().get() << (count++ * 8));
        }

        output.collect(key, new LongWritable(outputValue));
      }

      @Override
      public void configure(final JobConf job) {
      }

      @Override
      public void close() throws IOException {
      }
    });

    driver
        .withKeyGroupingComparator(new TestMapReduceDriver.FirstCharComparator());
    driver
        .withKeyOrderComparator(new TestMapReduceDriver.SecondCharComparator());

    driver.addInput(mapper, new Text("a1"), new LongWritable(1));
    driver.addInput(mapper, new Text("b1"), new LongWritable(1));
    driver.addInput(mapper, new Text("a3"), new LongWritable(3));
    driver.addInput(mapper, new Text("a2"), new LongWritable(2));

    driver.addInput(tokenMapper, new LongWritable(1), new Text("c1 d1"));

    driver.addOutput(new Text("a1"), new LongWritable(0x1));
    driver.addOutput(new Text("b1"), new LongWritable(0x1));
    driver.addOutput(new Text("a2"), new LongWritable(0x2 | (0x3 << 8)));
    driver.addOutput(new Text("c1"), new LongWritable(0x1));
    driver.addOutput(new Text("d1"), new LongWritable(0x1));

    driver.runTest(false);
  }

  @Test
  public void testNoMapper() throws IOException {
    MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable> testDriver = new MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable>();
    testDriver.withReducer(reducer);
    thrown.expectMessage(IllegalStateException.class,
        "No mappers were provided");
    testDriver.runTest(false);
  }

  @Test
  public void testWithCounter() throws IOException {
    MultipleInputsMapReduceDriver<Text, Text, Text, Text> testDriver = new MultipleInputsMapReduceDriver<Text, Text, Text, Text>();
    Mapper<Text, Text, Text, Text> mapperWithCounters = new TestMapDriver.MapperWithCounters<Text, Text, Text, Text>();
    Mapper<Text, Text, Text, Text> tokenMapperWithCounters = new TokenMapperWithCounters();
    testDriver
        .withMapper(mapperWithCounters)
        .withInput(mapperWithCounters, new Text("hie"), new Text("Hi"))
        .withMapper(tokenMapperWithCounters)
        .withInput(tokenMapperWithCounters, new Text("bie"),
            new Text("Goodbye Bye"))
        .withOutput(new Text("hie"), new Text("Hi"))
        .withOutput(new Text("bie"), new Text("Goodbye"))
        .withOutput(new Text("bie"), new Text("Bye"))
        .withCounter(TestMapDriver.MapperWithCounters.Counters.X, 1)
        .withCounter(TokenMapperWithCounters.Counters.Y, 2)
        .withCounter("category", "name", 3)
        .withReducer(
            new TestReduceDriver.ReducerWithCounters<Text, Text, Text, Text>())
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.COUNT, 2)
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.SUM, 3)
        .withCounter("category", "count", 2).withCounter("category", "sum", 3)
        .runTest(false);
  }

  @Test
  public void testWithCounterAndEnumCounterMissing() throws IOException {
    MultipleInputsMapReduceDriver<Text, Text, Text, Text> testDriver = new MultipleInputsMapReduceDriver<Text, Text, Text, Text>();

    thrown
        .expectAssertionErrorMessage("2 Error(s): (Actual counter ("
            + "\"org.apache.hadoop.mrunit.TestMultipleInputsMapReduceDriver$TokenMapperWithCounters$Counters\",\"Y\")"
            + " was not found in expected counters, Actual counter ("
            + "\"org.apache.hadoop.mrunit.TestMapDriver$MapperWithCounters$Counters\",\"X\")"
            + " was not found in expected counters");

    Mapper<Text, Text, Text, Text> mapperWithCounters = new TestMapDriver.MapperWithCounters<Text, Text, Text, Text>();
    Mapper<Text, Text, Text, Text> tokenMapperWithCounters = new TokenMapperWithCounters();

    testDriver
        .withMapper(mapperWithCounters)
        .withInput(mapperWithCounters, new Text("hie"), new Text("Hi"))
        .withMapper(tokenMapperWithCounters)
        .withInput(tokenMapperWithCounters, new Text("bie"),
            new Text("Goodbye Bye"))
        .withOutput(new Text("hie"), new Text("Hi"))
        .withOutput(new Text("bie"), new Text("Goodbye"))
        .withOutput(new Text("bie"), new Text("Bye"))
        .withStrictCounterChecking()
        .withCounter("category", "name", 3)
        .withReducer(
            new TestReduceDriver.ReducerWithCounters<Text, Text, Text, Text>())
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.COUNT, 2)
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.SUM, 3)
        .withCounter("category", "count", 2).withCounter("category", "sum", 3)
        .runTest(false);
  }

  @Test
  public void testWithCounterAndStringCounterMissing() throws IOException {
    MultipleInputsMapReduceDriver<Text, Text, Text, Text> testDriver = new MultipleInputsMapReduceDriver<Text, Text, Text, Text>();

    thrown.expectAssertionErrorMessage("1 Error(s): (Actual counter ("
        + "\"category\",\"name\")" + " was not found in expected counters");

    Mapper<Text, Text, Text, Text> mapperWithCounters = new TestMapDriver.MapperWithCounters<Text, Text, Text, Text>();
    Mapper<Text, Text, Text, Text> tokenMapperWithCounters = new TokenMapperWithCounters();

    testDriver
        .withMapper(mapperWithCounters)
        .withInput(mapperWithCounters, new Text("hie"), new Text("Hi"))
        .withMapper(tokenMapperWithCounters)
        .withInput(tokenMapperWithCounters, new Text("bie"),
            new Text("Goodbye Bye"))
        .withOutput(new Text("hie"), new Text("Hi"))
        .withOutput(new Text("bie"), new Text("Goodbye"))
        .withOutput(new Text("bie"), new Text("Bye"))
        .withStrictCounterChecking()
        .withCounter(TestMapDriver.MapperWithCounters.Counters.X, 1)
        .withCounter(TokenMapperWithCounters.Counters.Y, 2)
        .withReducer(
            new TestReduceDriver.ReducerWithCounters<Text, Text, Text, Text>())
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.COUNT, 2)
        .withCounter(TestReduceDriver.ReducerWithCounters.Counters.SUM, 3)
        .withCounter("category", "count", 2).withCounter("category", "sum", 3)
        .runTest(false);
  }

  @Test
  public void testWithFailedCounter() throws IOException {
    MultipleInputsMapReduceDriver<Text, Text, Text, Text> testDriver = new MultipleInputsMapReduceDriver<Text, Text, Text, Text>();

    thrown
        .expectAssertionErrorMessage("3 Error(s): ("
            + "Counter org.apache.hadoop.mrunit.TestMapDriver.MapperWithCounters.Counters.X has value 1 instead of expected 20, "
            + "Counter org.apache.hadoop.mrunit.TestMultipleInputsMapReduceDriver.TokenMapperWithCounters.Counters.Y has value 2 instead of expected 30, "
            + "Counter with category category and name name has value 3 instead of expected 20)");

    Mapper<Text, Text, Text, Text> mapperWithCounters = new TestMapDriver.MapperWithCounters<Text, Text, Text, Text>();
    Mapper<Text, Text, Text, Text> tokenMapperWithCounters = new TokenMapperWithCounters();

    testDriver
        .withMapper(mapperWithCounters)
        .withInput(mapperWithCounters, new Text("hie"), new Text("Hi"))
        .withMapper(tokenMapperWithCounters)
        .withInput(tokenMapperWithCounters, new Text("bie"),
            new Text("Goodbye Bye"))
        .withOutput(new Text("hie"), new Text("Hi"))
        .withOutput(new Text("bie"), new Text("Goodbye"))
        .withOutput(new Text("bie"), new Text("Bye"))
        .withCounter(TestMapDriver.MapperWithCounters.Counters.X, 20)
        .withCounter(TokenMapperWithCounters.Counters.Y, 30)
        .withReducer(
            new TestReduceDriver.ReducerWithCounters<Text, Text, Text, Text>())
        .withCounter("category", "name", 20).runTest(false);
  }

  @Test
  public void testJavaSerialization() throws IOException {
    final Configuration conf = new Configuration();
    conf.setStrings("io.serializations", conf.get("io.serializations"),
        "org.apache.hadoop.io.serializer.JavaSerialization");
    final MultipleInputsMapReduceDriver<Integer, IntWritable, Integer, IntWritable> testDriver = MultipleInputsMapReduceDriver
        .newMultipleInputMapReduceDriver(
                new IdentityReducer<Integer, IntWritable>())
        .withConfiguration(conf);
    Mapper<Integer, IntWritable, Integer, IntWritable> identityMapper = new IdentityMapper<Integer, IntWritable>();
    Mapper<Integer, IntWritable, Integer, IntWritable> anotherIdentityMapper = new IdentityMapper<Integer, IntWritable>();
    testDriver.addMapper(identityMapper);
    testDriver.withInput(identityMapper, 1, new IntWritable(2)).withInput(
        identityMapper, 2, new IntWritable(3));
    testDriver.addMapper(anotherIdentityMapper);
    testDriver.withInput(anotherIdentityMapper, 3, new IntWritable(4))
        .withInput(anotherIdentityMapper, 4, new IntWritable(5));
    testDriver
        .withKeyOrderComparator(new JavaSerializationComparator<Integer>());
    testDriver
        .withKeyGroupingComparator(TestMapReduceDriver.INTEGER_COMPARATOR);

    testDriver.withOutput(1, new IntWritable(2))
        .withOutput(2, new IntWritable(3)).withOutput(3, new IntWritable(4))
        .withOutput(4, new IntWritable(5));
    testDriver.runTest(false);
  }

  @Test
  public void testCopy() throws IOException {
    final Text key = new Text("a");
    final LongWritable value = new LongWritable(1);
    driver.addInput(mapper, key, value);
    key.set("b");
    value.set(2);
    driver.addInput(mapper, key, value);

    key.set("a");
    value.set(1);
    driver.addOutput(key, value);
    key.set("b");
    value.set(2);
    driver.addOutput(key, value);

    final LongWritable longKey = new LongWritable(3);
    final Text textValue = new Text("c d");
    driver.addInput(tokenMapper, longKey, textValue);
    longKey.set(4);
    textValue.set("e f g");
    driver.addInput(tokenMapper, longKey, textValue);

    key.set("c");
    value.set(3);
    driver.addOutput(key, value);
    key.set("d");
    value.set(3);
    driver.addOutput(key, value);
    key.set("e");
    value.set(4);
    driver.addOutput(key, value);
    key.set("f");
    value.set(4);
    driver.addOutput(key, value);
    key.set("g");
    value.set(4);
    driver.addOutput(key, value);

    driver.runTest(false);
  }

  @Test
  public void testOutputFormat() throws IOException {
    driver.withInputFormat(SequenceFileInputFormat.class);
    driver.withOutputFormat(SequenceFileOutputFormat.class);
    driver.withInput(mapper, new Text("a"), new LongWritable(1));
    driver.withInput(mapper, new Text("a"), new LongWritable(2));
    driver.withInput(tokenMapper, new LongWritable(3), new Text("a b"));
    driver.withOutput(new Text("a"), new LongWritable(6));
    driver.withOutput(new Text("b"), new LongWritable(3));
    driver.runTest(false);
  }

  @SuppressWarnings({ "unchecked", "rawtypes" })
  @Test
  public void testOutputFormatWithMismatchInOutputClasses() throws IOException {
    final MultipleInputsMapReduceDriver testDriver = this.driver;
    testDriver.withInputFormat(TextInputFormat.class);
    testDriver.withOutputFormat(TextOutputFormat.class);
    testDriver.withInput(mapper, new Text("a"), new LongWritable(1));
    testDriver.withInput(mapper, new Text("a"), new LongWritable(2));
    testDriver.withInput(tokenMapper, new LongWritable(3), new Text("a b"));
    testDriver.withOutput(new LongWritable(0), new Text("a\t6"));
    testDriver.withOutput(new LongWritable(4), new Text("b\t3"));
    testDriver.runTest(false);
  }

  @Test
  public void testMapInputFile() throws IOException {
    MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable> testDriver = new MultipleInputsMapReduceDriver<Text, LongWritable, Text, LongWritable>(
        reducer);

    InputPathStoringMapper<LongWritable, LongWritable> inputPathStoringMapper = new InputPathStoringMapper<LongWritable, LongWritable>();
    Path mapInputPath = new Path("myfile");
    testDriver.addMapper(inputPathStoringMapper);
    testDriver.setMapInputPath(inputPathStoringMapper, mapInputPath);
    assertEquals(mapInputPath.getName(),
        testDriver.getMapInputPath(inputPathStoringMapper).getName());
    testDriver.withInput(inputPathStoringMapper, new Text("a"),
        new LongWritable(1));

    InputPathStoringMapper<LongWritable, LongWritable> anotherInputPathStoringMapper = new InputPathStoringMapper<LongWritable, LongWritable>();
    Path anotherMapInputPath = new Path("myotherfile");
    testDriver.addMapper(anotherInputPathStoringMapper);
    testDriver.setMapInputPath(anotherInputPathStoringMapper,
        anotherMapInputPath);
    assertEquals(anotherMapInputPath.getName(),
        testDriver.getMapInputPath(anotherInputPathStoringMapper).getName());
    testDriver.withInput(anotherInputPathStoringMapper, new Text("b"),
        new LongWritable(2));

    testDriver.runTest(false);
    assertNotNull(inputPathStoringMapper.getMapInputPath());
    assertEquals(mapInputPath.getName(), inputPathStoringMapper
        .getMapInputPath().getName());
  }

  @Test
  public void testGroupComparatorBehaviorFirst() throws IOException {
    driver
        .withInput(mapper, new Text("A1"), new LongWritable(1L))
        .withInput(mapper, new Text("A2"), new LongWritable(1L))
        .withInput(mapper, new Text("B1"), new LongWritable(1L))
        .withInput(mapper, new Text("B2"), new LongWritable(1L))
        .withInput(mapper, new Text("C1"), new LongWritable(1L))
        .withInput(tokenMapper, new LongWritable(3L), new Text("D1 D2 E1"))
        .withOutput(new Text("A1"), new LongWritable(2L))
        .withOutput(new Text("B1"), new LongWritable(2L))
        .withOutput(new Text("C1"), new LongWritable(1L))
        .withOutput(new Text("D1"), new LongWritable(6L))
        .withOutput(new Text("E1"), new LongWritable(3L))
        .withKeyGroupingComparator(
            new TestMapReduceDriver.FirstCharComparator()).runTest(false);
  }

  @Test
  public void testGroupComparatorBehaviorSecond() throws IOException {
    driver
        .withInput(mapper, new Text("1A"), new LongWritable(1L))
        .withInput(mapper, new Text("2A"), new LongWritable(1L))
        .withInput(mapper, new Text("1B"), new LongWritable(1L))
        .withInput(mapper, new Text("2B"), new LongWritable(1L))
        .withInput(mapper, new Text("1C"), new LongWritable(1L))
        .withInput(tokenMapper, new LongWritable(2L), new Text("1D 2D 1E"))
        .withOutput(new Text("1A"), new LongWritable(1L))
        .withOutput(new Text("2A"), new LongWritable(1L))
        .withOutput(new Text("1B"), new LongWritable(1L))
        .withOutput(new Text("2B"), new LongWritable(1L))
        .withOutput(new Text("1C"), new LongWritable(1L))
        .withOutput(new Text("1D"), new LongWritable(2L))
        .withOutput(new Text("2D"), new LongWritable(2L))
        .withOutput(new Text("1E"), new LongWritable(2L))
        .withKeyGroupingComparator(
            new TestMapReduceDriver.SecondCharComparator()).runTest(false);
  }

  @Test
  public void testGroupingComparatorSpecifiedByConf() throws IOException {
    JobConf conf = new JobConf(new Configuration());
    conf.setOutputValueGroupingComparator(TestMapReduceDriver.FirstCharComparator.class);
    driver.withInput(mapper, new Text("A1"), new LongWritable(1L))
        .withInput(mapper, new Text("A2"), new LongWritable(1L))
        .withInput(mapper, new Text("B1"), new LongWritable(1L))
        .withInput(mapper, new Text("B2"), new LongWritable(1L))
        .withInput(mapper, new Text("C1"), new LongWritable(1L))
        .withInput(tokenMapper, new LongWritable(3L), new Text("D1 D2 E1"))
        .withOutput(new Text("A1"), new LongWritable(2L))
        .withOutput(new Text("B1"), new LongWritable(2L))
        .withOutput(new Text("C1"), new LongWritable(1L))
        .withOutput(new Text("D1"), new LongWritable(6L))
        .withOutput(new Text("E1"), new LongWritable(3L))
        .withConfiguration(conf).runTest(false);
  }

  @SuppressWarnings("unchecked")
  @Test
  public void testUseOfWritableRegisteredComparator() throws IOException {
    MultipleInputsMapReduceDriver<TestWritable, Text, TestWritable, Text> testDriver = new MultipleInputsMapReduceDriver<TestWritable, Text, TestWritable, Text>(
        new IdentityReducer<TestWritable, Text>());

    IdentityMapper<TestWritable, Text> identityMapper = new IdentityMapper<TestWritable, Text>();
    IdentityMapper<TestWritable, Text> anotherIdentityMapper = new IdentityMapper<TestWritable, Text>();
    testDriver.addMapper(identityMapper);
    testDriver.addMapper(anotherIdentityMapper);
    testDriver
        .withInput(identityMapper, new TestWritable("A1"), new Text("A1"))
        .withInput(identityMapper, new TestWritable("A2"), new Text("A2"))
        .withInput(identityMapper, new TestWritable("A3"), new Text("A3"))
        .withInput(anotherIdentityMapper, new TestWritable("B1"),
            new Text("B1"))
        .withInput(anotherIdentityMapper, new TestWritable("B2"),
            new Text("B2"))
        .withKeyGroupingComparator(new TestWritable.SingleGroupComparator())
        .withOutput(new TestWritable("B2"), new Text("B2"))
        .withOutput(new TestWritable("B2"), new Text("B1"))
        .withOutput(new TestWritable("B2"), new Text("A3"))
        .withOutput(new TestWritable("B2"), new Text("A2"))
        .withOutput(new TestWritable("B2"), new Text("A1")).runTest(true); // ordering
                                                                           // is
                                                                           // important
  }

  static class TokenMapperWithCounters extends MapReduceBase implements
      Mapper<Text, Text, Text, Text> {
    private final Text output = new Text();

    @Override
    public void map(Text key, Text value,
        OutputCollector<Text, Text> collector, Reporter reporter)
        throws IOException {
      String[] tokens = value.toString().split("\\s");
      for (String token : tokens) {
        output.set(token);
        collector.collect(key, output);
        reporter.getCounter(Counters.Y).increment(1);
        reporter.getCounter("category", "name").increment(1);
      }
    }

    public static enum Counters {
      Y
    }
  }

  static class TokenMapper extends MapReduceBase implements
      Mapper<LongWritable, Text, Text, LongWritable> {
    private final Text output = new Text();

    @Override
    public void map(LongWritable longWritable, Text text,
        OutputCollector<Text, LongWritable> textLongWritableOutputCollector,
        Reporter reporter) throws IOException {
      String[] tokens = text.toString().split("\\s");
      for (String token : tokens) {
        output.set(token);
        textLongWritableOutputCollector.collect(output, longWritable);
      }
    }
  }

  static class ReverseIdentityMapper<KEYIN, VALUEIN> extends MapReduceBase
      implements Mapper<KEYIN, VALUEIN, VALUEIN, KEYIN> {
    @Override
    public void map(KEYIN key, VALUEIN value,
        OutputCollector<VALUEIN, KEYIN> vkOutputCollector, Reporter reporter)
        throws IOException {
      vkOutputCollector.collect(value, key);
    }
  }
}
