blob: d8d8cea87b47a415526c54b88ddac5913f39f120 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.Task.CombineOutputCollector;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.RawKeyValueIterator;
/**
* A JUnit for testing availability and accessibility of shuffle related API.
* It is needed for maintaining comptability with external sub-classes of
* ShuffleConsumerPlugin and AuxiliaryService(s) like ShuffleHandler.
*
* The importance of this test is for preserving API with 3rd party plugins.
*/
public class TestShufflePlugin<K, V> {
static class TestShuffleConsumerPlugin<K, V> implements ShuffleConsumerPlugin<K, V> {
@Override
public void init(ShuffleConsumerPlugin.Context<K, V> context) {
// just verify that Context has kept its public interface
context.getReduceId();
context.getJobConf();
context.getLocalFS();
context.getUmbilical();
context.getLocalDirAllocator();
context.getReporter();
context.getCodec();
context.getCombinerClass();
context.getCombineCollector();
context.getSpilledRecordsCounter();
context.getReduceCombineInputCounter();
context.getShuffledMapsCounter();
context.getReduceShuffleBytes();
context.getFailedShuffleCounter();
context.getMergedMapOutputsCounter();
context.getStatus();
context.getCopyPhase();
context.getMergePhase();
context.getReduceTask();
context.getMapOutputFile();
}
@Override
public void close(){
}
@Override
public RawKeyValueIterator run() throws java.io.IOException, java.lang.InterruptedException{
return null;
}
}
@Test
/**
* A testing method instructing core hadoop to load an external ShuffleConsumerPlugin
* as if it came from a 3rd party.
*/
public void testPluginAbility() {
try{
// create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin
JobConf jobConf = new JobConf();
jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN,
TestShufflePlugin.TestShuffleConsumerPlugin.class,
ShuffleConsumerPlugin.class);
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
Class<? extends ShuffleConsumerPlugin> clazz =
jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz);
// load 3rd party plugin through core's factory method
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf);
assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
@Test
/**
* A testing method verifying availability and accessibility of API that is needed
* for sub-classes of ShuffleConsumerPlugin
*/
public void testConsumerApi() {
JobConf jobConf = new JobConf();
ShuffleConsumerPlugin<K, V> shuffleConsumerPlugin = new TestShuffleConsumerPlugin<K, V>();
//mock creation
ReduceTask mockReduceTask = mock(ReduceTask.class);
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = jobConf.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
try {
String [] dirs = jobConf.getLocalDirs();
// verify that these APIs are available through super class handler
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, jobConf, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
shuffleConsumerPlugin.init(context);
shuffleConsumerPlugin.run();
shuffleConsumerPlugin.close();
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
// verify that these APIs are available for 3rd party plugins
mockReduceTask.getTaskID();
mockReduceTask.getJobID();
mockReduceTask.getNumMaps();
mockReduceTask.getPartition();
mockReporter.progress();
}
@Test
/**
* A testing method verifying availability and accessibility of API needed for
* AuxiliaryService(s) which are "Shuffle-Providers" (ShuffleHandler and 3rd party plugins)
*/
public void testProviderApi() {
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
JobConf mockJobConf = mock(JobConf.class);
try {
mockLocalDirAllocator.getLocalPathToRead("", mockJobConf);
}
catch (Exception e) {
assertTrue("Threw exception:" + e, false);
}
}
}