| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| |
| import junit.extensions.TestSetup; |
| import junit.framework.Test; |
| import junit.framework.TestCase; |
| import junit.framework.TestSuite; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.lib.IdentityMapper; |
| import org.apache.hadoop.mapred.lib.IdentityReducer; |
| import org.apache.hadoop.mapred.lib.NullOutputFormat; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.hadoop.examples.RandomWriter; |
| import org.apache.hadoop.examples.Sort; |
| |
| /** |
| * A JUnit test to test the Map-Reduce framework's sort |
| * with a Mini Map-Reduce Cluster with a Mini HDFS Clusters. |
| */ |
| public class TestMiniMRDFSSort extends TestCase { |
| // Input/Output paths for sort |
| private static final Path SORT_INPUT_PATH = new Path("/sort/input"); |
| private static final Path SORT_OUTPUT_PATH = new Path("/sort/output"); |
| |
| // Knobs to control randomwriter; and hence sort |
| private static final int NUM_HADOOP_SLAVES = 3; |
| // make it big enough to cause a spill in the map |
| private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024; |
| private static final int RW_MAPS_PER_HOST = 2; |
| |
| private static MiniMRCluster mrCluster = null; |
| private static MiniDFSCluster dfsCluster = null; |
| private static FileSystem dfs = null; |
| public static Test suite() { |
| TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) { |
| protected void setUp() throws Exception { |
| Configuration conf = new Configuration(); |
| dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null); |
| dfs = dfsCluster.getFileSystem(); |
| mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, |
| dfs.getUri().toString(), 1); |
| } |
| protected void tearDown() throws Exception { |
| if (dfsCluster != null) { dfsCluster.shutdown(); } |
| if (mrCluster != null) { mrCluster.shutdown(); } |
| } |
| }; |
| return setup; |
| } |
| |
| public static void runRandomWriter(JobConf job, Path sortInput) |
| throws Exception { |
| // Scale down the default settings for RandomWriter for the test-case |
| // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP |
| job.setInt(RandomWriter.BYTES_PER_MAP, RW_BYTES_PER_MAP); |
| job.setInt(RandomWriter.MAPS_PER_HOST, RW_MAPS_PER_HOST); |
| String[] rwArgs = {sortInput.toString()}; |
| |
| // Run RandomWriter |
| assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0); |
| } |
| |
| private static void runSort(JobConf job, Path sortInput, Path sortOutput) |
| throws Exception { |
| |
| job.setInt(JobContext.JVM_NUMTASKS_TORUN, -1); |
| job.setInt(JobContext.IO_SORT_MB, 1); |
| job.setNumMapTasks(12); |
| |
| // Setup command-line arguments to 'sort' |
| String[] sortArgs = {sortInput.toString(), sortOutput.toString()}; |
| |
| // Run Sort |
| Sort sort = new Sort(); |
| assertEquals(ToolRunner.run(job, sort, sortArgs), 0); |
| org.apache.hadoop.mapreduce.Counters counters = sort.getResult().getCounters(); |
| long mapInput = counters.findCounter( |
| org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP, |
| org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ). |
| getValue(); |
| long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, |
| "HDFS_BYTES_READ").getValue(); |
| // the hdfs read should be between 100% and 110% of the map input bytes |
| assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead, |
| (hdfsRead < (mapInput * 1.1)) && |
| (hdfsRead >= mapInput)); |
| } |
| |
| private static void runSortValidator(JobConf job, |
| Path sortInput, Path sortOutput) |
| throws Exception { |
| String[] svArgs = {"-sortInput", sortInput.toString(), |
| "-sortOutput", sortOutput.toString()}; |
| |
| // Run Sort-Validator |
| assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0); |
| } |
| |
| private static class ReuseDetector extends MapReduceBase |
| implements Mapper<BytesWritable,BytesWritable, Text, Text> { |
| static int instances = 0; |
| Reporter reporter = null; |
| |
| @Override |
| public void map(BytesWritable key, BytesWritable value, |
| OutputCollector<Text, Text> output, |
| Reporter reporter) throws IOException { |
| this.reporter = reporter; |
| } |
| |
| public void close() throws IOException { |
| reporter.incrCounter("jvm", "use", ++instances); |
| } |
| } |
| |
| private static void runJvmReuseTest(JobConf job, |
| boolean reuse) throws IOException { |
| // setup a map-only job that reads the input and only sets the counters |
| // based on how many times the jvm was reused. |
| job.setInt(JobContext.JVM_NUMTASKS_TORUN, reuse ? -1 : 1); |
| FileInputFormat.setInputPaths(job, SORT_INPUT_PATH); |
| job.setInputFormat(SequenceFileInputFormat.class); |
| job.setOutputFormat(NullOutputFormat.class); |
| job.setMapperClass(ReuseDetector.class); |
| job.setOutputKeyClass(Text.class); |
| job.setOutputValueClass(Text.class); |
| job.setNumMapTasks(24); |
| job.setNumReduceTasks(0); |
| RunningJob result = JobClient.runJob(job); |
| long uses = result.getCounters().findCounter("jvm", "use").getValue(); |
| int maps = job.getNumMapTasks(); |
| if (reuse) { |
| assertTrue("maps = " + maps + ", uses = " + uses, maps < uses); |
| } else { |
| assertEquals("uses should be number of maps", job.getNumMapTasks(), uses); |
| } |
| } |
| |
| public void testMapReduceSort() throws Exception { |
| // Run randomwriter to generate input for 'sort' |
| runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH); |
| |
| // Run sort |
| runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH); |
| |
| // Run sort-validator to check if sort worked correctly |
| runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, |
| SORT_OUTPUT_PATH); |
| } |
| |
| public void testJvmReuse() throws Exception { |
| runJvmReuseTest(mrCluster.createJobConf(), true); |
| } |
| |
| public void testNoJvmReuse() throws Exception { |
| runJvmReuseTest(mrCluster.createJobConf(), false); |
| } |
| |
| private static class BadPartitioner |
| implements Partitioner<LongWritable,Text> { |
| boolean low; |
| public void configure(JobConf conf) { |
| low = conf.getBoolean("test.testmapred.badpartition", true); |
| } |
| public int getPartition(LongWritable k, Text v, int numPartitions) { |
| return low ? -1 : numPartitions; |
| } |
| } |
| |
| public void testPartitioner() throws Exception { |
| JobConf conf = mrCluster.createJobConf(); |
| conf.setPartitionerClass(BadPartitioner.class); |
| conf.setNumReduceTasks(3); |
| FileSystem fs = FileSystem.get(conf); |
| Path testdir = |
| new Path("blah").makeQualified(fs.getUri(), fs.getWorkingDirectory()); |
| Path inFile = new Path(testdir, "blah"); |
| DataOutputStream f = fs.create(inFile); |
| f.writeBytes("blah blah blah\n"); |
| f.close(); |
| FileInputFormat.setInputPaths(conf, inFile); |
| FileOutputFormat.setOutputPath(conf, new Path(testdir, "out")); |
| conf.setMapperClass(IdentityMapper.class); |
| conf.setReducerClass(IdentityReducer.class); |
| conf.setOutputKeyClass(LongWritable.class); |
| conf.setOutputValueClass(Text.class); |
| conf.setMaxMapAttempts(1); |
| |
| // partition too low |
| conf.setBoolean("test.testmapred.badpartition", true); |
| boolean pass = true; |
| try { |
| JobClient.runJob(conf); |
| } catch (IOException e) { |
| pass = false; |
| } |
| assertFalse("should fail for partition < 0", pass); |
| |
| // partition too high |
| conf.setBoolean("test.testmapred.badpartition", false); |
| pass = true; |
| try { |
| JobClient.runJob(conf); |
| } catch (IOException e) { |
| pass = false; |
| } |
| assertFalse("should fail for partition >= numPartitions", pass); |
| } |
| |
| } |