| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapreduce.MapReduceTestUtil; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.junit.Test; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertFalse; |
| |
| /** |
| * A JUnit test to test Job System Directory with Mini-DFS. |
| */ |
| public class TestJobSysDirWithDFS { |
| private static final Log LOG = |
| LogFactory.getLog(TestJobSysDirWithDFS.class.getName()); |
| |
| static final int NUM_MAPS = 10; |
| static final int NUM_SAMPLES = 100000; |
| |
| public static class TestResult { |
| public String output; |
| public RunningJob job; |
| TestResult(RunningJob job, String output) { |
| this.job = job; |
| this.output = output; |
| } |
| } |
| |
| public static TestResult launchWordCount(JobConf conf, |
| Path inDir, |
| Path outDir, |
| String input, |
| int numMaps, |
| int numReduces, |
| String sysDir) throws IOException { |
| FileSystem inFs = inDir.getFileSystem(conf); |
| FileSystem outFs = outDir.getFileSystem(conf); |
| outFs.delete(outDir, true); |
| if (!inFs.mkdirs(inDir)) { |
| throw new IOException("Mkdirs failed to create " + inDir.toString()); |
| } |
| { |
| DataOutputStream file = inFs.create(new Path(inDir, "part-0")); |
| file.writeBytes(input); |
| file.close(); |
| } |
| conf.setJobName("wordcount"); |
| conf.setInputFormat(TextInputFormat.class); |
| |
| // the keys are words (strings) |
| conf.setOutputKeyClass(Text.class); |
| // the values are counts (ints) |
| conf.setOutputValueClass(IntWritable.class); |
| |
| conf.setMapperClass(WordCount.MapClass.class); |
| conf.setCombinerClass(WordCount.Reduce.class); |
| conf.setReducerClass(WordCount.Reduce.class); |
| FileInputFormat.setInputPaths(conf, inDir); |
| FileOutputFormat.setOutputPath(conf, outDir); |
| conf.setNumMapTasks(numMaps); |
| conf.setNumReduceTasks(numReduces); |
| conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/subru/mapred/system"); |
| JobClient jobClient = new JobClient(conf); |
| RunningJob job = jobClient.runJob(conf); |
| // Checking that the Job Client system dir is not used |
| assertFalse(FileSystem.get(conf).exists( |
| new Path(conf.get(JTConfig.JT_SYSTEM_DIR)))); |
| // Check if the Job Tracker system dir is propogated to client |
| assertFalse(sysDir.contains("/tmp/subru/mapred/system")); |
| assertTrue(sysDir.contains("custom")); |
| return new TestResult(job, MapReduceTestUtil.readOutput(outDir, conf)); |
| } |
| |
| static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir) |
| throws IOException { |
| LOG.info("runWordCount"); |
| // Run a word count example |
| // Keeping tasks that match this pattern |
| TestResult result; |
| final Path inDir = new Path("./wc/input"); |
| final Path outDir = new Path("./wc/output"); |
| result = launchWordCount(jobConf, inDir, outDir, |
| "The quick brown fox\nhas many silly\n" + |
| "red fox sox\n", |
| 3, 1, sysDir); |
| assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + |
| "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output); |
| // Checking if the Job ran successfully in spite of different system dir config |
| // between Job Client & Job Tracker |
| assertTrue(result.job.isSuccessful()); |
| } |
| @Test |
| public void testWithDFS() throws IOException { |
| MiniDFSCluster dfs = null; |
| MiniMRCluster mr = null; |
| FileSystem fileSys = null; |
| try { |
| final int taskTrackers = 4; |
| |
| JobConf conf = new JobConf(); |
| conf.set(JTConfig.JT_SYSTEM_DIR, "/tmp/custom/mapred/system"); |
| dfs = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); |
| fileSys = dfs.getFileSystem(); |
| mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf); |
| |
| runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir")); |
| } finally { |
| if (dfs != null) { dfs.shutdown(); } |
| if (mr != null) { mr.shutdown(); |
| } |
| } |
| } |
| |
| } |