| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| /** |
| * TestJobInProgress is a unit test to test consistency of JobInProgress class |
| * data structures under different conditions (speculation/locality) and at |
| * different stages (tasks are running/pending/killed) |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import junit.extensions.TestSetup; |
| import junit.framework.Test; |
| import junit.framework.TestCase; |
| import junit.framework.TestSuite; |
| import static org.mockito.Mockito.*; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress; |
| import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker; |
| import org.apache.hadoop.mapred.TaskStatus.Phase; |
| import org.apache.hadoop.mapred.UtilsForTests.FakeClock; |
| import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; |
| import org.apache.hadoop.mapreduce.JobCounter; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; |
| import org.apache.hadoop.net.DNSToSwitchMapping; |
| import org.apache.hadoop.net.Node; |
| import org.apache.hadoop.net.StaticMapping; |
| |
| @SuppressWarnings("deprecation") |
| public class TestJobInProgress extends TestCase { |
| static final Log LOG = LogFactory.getLog(TestJobInProgress.class); |
| |
| static FakeJobTracker jobTracker; |
| |
| static String trackers[] = new String[] { |
| "tracker_tracker1.r1.com:1000", |
| "tracker_tracker2.r1.com:1000", |
| "tracker_tracker3.r2.com:1000", |
| "tracker_tracker4.r3.com:1000" |
| }; |
| |
| static String[] hosts = new String[] { |
| "tracker1.r1.com", |
| "tracker2.r1.com", |
| "tracker3.r2.com", |
| "tracker4.r3.com" |
| }; |
| |
| static String[] racks = new String[] { "/r1", "/r1", "/r2", "/r3" }; |
| |
| static int numUniqueHosts = hosts.length; |
| static int clusterSize = trackers.length; |
| |
| public static Test suite() { |
| TestSetup setup = new TestSetup(new TestSuite(TestJobInProgress.class)) { |
| protected void setUp() throws Exception { |
| JobConf conf = new JobConf(); |
| conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0"); |
| conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0"); |
| conf.setClass("topology.node.switch.mapping.impl", |
| StaticMapping.class, DNSToSwitchMapping.class); |
| jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers); |
| // Set up the Topology Information |
| for (int i = 0; i < hosts.length; i++) { |
| StaticMapping.addNodeToRack(hosts[i], racks[i]); |
| } |
| for (String s: trackers) { |
| FakeObjectUtilities.establishFirstContact(jobTracker, s); |
| } |
| } |
| }; |
| return setup; |
| } |
| |
| static class MyFakeJobInProgress extends FakeJobInProgress { |
| |
| MyFakeJobInProgress(JobConf jc, JobTracker jt) throws IOException { |
| super(jc, jt); |
| } |
| |
| @Override |
| TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) { |
| // Set all splits to reside on one host. This will ensure that |
| // one tracker gets data local, one gets rack local and two others |
| // get non-local maps |
| TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMapTasks]; |
| String[] splitHosts0 = new String[] { hosts[0] }; |
| for (int i = 0; i < numMapTasks; i++) { |
| splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0); |
| } |
| return splits; |
| } |
| |
| private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, |
| String taskTracker) { |
| TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, |
| 0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker, |
| tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters()); |
| updateTaskStatus(tip, status); |
| } |
| |
| private TaskInProgress getTipForTaskID(TaskAttemptID tid, boolean isMap) { |
| TaskInProgress result = null; |
| TaskID id = tid.getTaskID(); |
| TaskInProgress[] arrayToLook = isMap ? maps : reduces; |
| |
| for (int i = 0; i < arrayToLook.length; i++) { |
| TaskInProgress tip = arrayToLook[i]; |
| if (tip.getTIPId() == id) { |
| result = tip; |
| break; |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Find a new Map or a reduce task and mark it as running on the specified |
| * tracker |
| */ |
| public TaskAttemptID findAndRunNewTask(boolean isMap, |
| String tt, String host, |
| int clusterSize, |
| int numUniqueHosts) |
| throws IOException { |
| TaskTrackerStatus tts = new TaskTrackerStatus(tt, host); |
| Task task = isMap ? |
| obtainNewMapTask(tts, clusterSize, numUniqueHosts) : |
| obtainNewReduceTask(tts, clusterSize, numUniqueHosts); |
| TaskAttemptID tid = task.getTaskID(); |
| makeRunning(task.getTaskID(), getTipForTaskID(tid, isMap), tt); |
| return tid; |
| } |
| } |
| |
| public void testPendingMapTaskCount() throws Exception { |
| |
| int numMaps = 4; |
| int numReds = 4; |
| |
| JobConf conf = new JobConf(); |
| conf.setNumMapTasks(numMaps); |
| conf.setNumReduceTasks(numReds); |
| conf.setSpeculativeExecution(false); |
| conf.setBoolean( |
| JobContext.SETUP_CLEANUP_NEEDED, false); |
| MyFakeJobInProgress job1 = new MyFakeJobInProgress(conf, jobTracker); |
| job1.initTasks(); |
| |
| TaskAttemptID[] tid = new TaskAttemptID[numMaps]; |
| |
| for (int i = 0; i < numMaps; i++) { |
| tid[i] = job1.findAndRunNewTask(true, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| } |
| |
| // Fail all maps |
| for (int i = 0; i < numMaps; i++) { |
| job1.failTask(tid[i]); |
| } |
| |
| MyFakeJobInProgress job2 = new MyFakeJobInProgress(conf, jobTracker); |
| job2.initTasks(); |
| |
| for (int i = 0; i < numMaps; i++) { |
| tid[i] = job2.findAndRunNewTask(true, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| job2.finishTask(tid[i]); |
| } |
| |
| for (int i = 0; i < numReds/2; i++) { |
| tid[i] = job2.findAndRunNewTask(false, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| } |
| |
| for (int i = 0; i < numReds/4; i++) { |
| job2.finishTask(tid[i]); |
| } |
| |
| for (int i = numReds/4; i < numReds/2; i++) { |
| job2.failTask(tid[i]); |
| } |
| |
| // Job1. All Maps have failed, no reduces have been scheduled |
| checkTaskCounts(job1, 0, numMaps, 0, numReds); |
| |
| // Job2. All Maps have completed. One reducer has completed, one has |
| // failed and two others have not been scheduled |
| checkTaskCounts(job2, 0, 0, 0, 3 * numReds / 4); |
| } |
| |
| /** |
| * Test if running tasks are correctly maintained for various types of jobs |
| */ |
| static void testRunningTaskCount(boolean speculation) throws Exception { |
| LOG.info("Testing running jobs with speculation : " + speculation); |
| |
| JobConf conf = new JobConf(); |
| conf.setNumMapTasks(2); |
| conf.setNumReduceTasks(2); |
| conf.setSpeculativeExecution(speculation); |
| MyFakeJobInProgress jip = new MyFakeJobInProgress(conf, jobTracker); |
| jip.initTasks(); |
| |
| TaskAttemptID[] tid = new TaskAttemptID[4]; |
| |
| for (int i = 0; i < 2; i++) { |
| tid[i] = jip.findAndRunNewTask(true, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| } |
| |
| // check if the running structures are populated |
| Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>(); |
| for (Map.Entry<Node, Set<TaskInProgress>> s : |
| jip.getRunningMapCache().entrySet()) { |
| uniqueTasks.addAll(s.getValue()); |
| } |
| |
| // add non local map tasks |
| uniqueTasks.addAll(jip.getNonLocalRunningMaps()); |
| |
| assertEquals("Running map count doesnt match for jobs with speculation " |
| + speculation, |
| jip.runningMaps(), uniqueTasks.size()); |
| |
| for (int i = 0; i < 2; i++ ) { |
| tid[i] = jip.findAndRunNewTask(false, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| } |
| |
| assertEquals("Running reducer count doesnt match for" + |
| " jobs with speculation " |
| + speculation, |
| jip.runningReduces(), jip.getRunningReduces().size()); |
| |
| } |
| |
| public void testRunningTaskCount() throws Exception { |
| // test with spec = false |
| testRunningTaskCount(false); |
| |
| // test with spec = true |
| testRunningTaskCount(true); |
| |
| } |
| |
| static void checkTaskCounts(JobInProgress jip, int runningMaps, |
| int pendingMaps, int runningReduces, int pendingReduces) { |
| Counters counter = jip.getJobCounters(); |
| long totalTaskCount = counter.getCounter(JobCounter.TOTAL_LAUNCHED_MAPS) |
| + counter.getCounter(JobCounter.TOTAL_LAUNCHED_REDUCES); |
| |
| LOG.info("totalTaskCount is " + totalTaskCount); |
| LOG.info(" Running Maps:" + jip.runningMaps() + |
| " Pending Maps:" + jip.pendingMaps() + |
| " Running Reds:" + jip.runningReduces() + |
| " Pending Reds:" + jip.pendingReduces()); |
| |
| assertEquals(jip.getNumTaskCompletionEvents(),totalTaskCount); |
| assertEquals(runningMaps, jip.runningMaps()); |
| assertEquals(pendingMaps, jip.pendingMaps()); |
| assertEquals(runningReduces, jip.runningReduces()); |
| assertEquals(pendingReduces, jip.pendingReduces()); |
| } |
| |
| public void testJobSummary() throws Exception { |
| int numMaps = 2; |
| int numReds = 2; |
| JobConf conf = new JobConf(); |
| conf.setNumMapTasks(numMaps); |
| conf.setNumReduceTasks(numReds); |
| // Spying a fake is easier than mocking here |
| MyFakeJobInProgress jspy = spy(new MyFakeJobInProgress(conf, jobTracker)); |
| jspy.initTasks(); |
| TaskAttemptID tid; |
| |
| // Launch some map tasks |
| for (int i = 0; i < numMaps; i++) { |
| jspy.maps[i].setExecStartTime(i + 1); |
| tid = jspy.findAndRunNewTask(true, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| jspy.finishTask(tid); |
| } |
| |
| // Launch some reduce tasks |
| for (int i = 0; i < numReds; i++) { |
| jspy.reduces[i].setExecStartTime(i + numMaps + 1); |
| tid = jspy.findAndRunNewTask(false, trackers[i], hosts[i], |
| clusterSize, numUniqueHosts); |
| jspy.finishTask(tid); |
| } |
| |
| // Should be invoked numMaps + numReds times by different TIP objects |
| verify(jspy, times(4)).setFirstTaskLaunchTime(any(TaskInProgress.class)); |
| |
| ClusterStatus cspy = spy(new ClusterStatus(4, 0, 0, 0, 0, 4, 4, |
| JobTrackerStatus.RUNNING, 0)); |
| |
| JobInProgress.JobSummary.logJobSummary(jspy, cspy); |
| |
| verify(jspy).getStatus(); |
| verify(jspy).getProfile(); |
| verify(jspy).getJobCounters(); |
| verify(jspy, atLeastOnce()).getJobID(); |
| verify(jspy).getStartTime(); |
| verify(jspy).getFirstTaskLaunchTimes(); |
| verify(jspy).getFinishTime(); |
| verify(jspy).getTasks(TaskType.MAP); |
| verify(jspy).getTasks(TaskType.REDUCE); |
| verify(jspy).getNumSlotsPerMap(); |
| verify(jspy).getNumSlotsPerReduce(); |
| verify(cspy).getMaxMapTasks(); |
| verify(cspy).getMaxReduceTasks(); |
| |
| assertEquals("firstMapTaskLaunchTime", 1, |
| jspy.getFirstTaskLaunchTimes().get(TaskType.MAP).longValue()); |
| assertEquals("firstReduceTaskLaunchTime", 3, |
| jspy.getFirstTaskLaunchTimes().get(TaskType.REDUCE).longValue()); |
| } |
| } |