| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.util.ArrayList; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.List; |
| |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType; |
| import org.apache.hadoop.mapred.lib.IdentityMapper; |
| import org.apache.hadoop.mapred.lib.IdentityReducer; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.MapReduceTestUtil; |
| import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| import junit.extensions.TestSetup; |
| import junit.framework.Test; |
| import junit.framework.TestCase; |
| import junit.framework.TestSuite; |
| |
| /** |
| * Test whether the JobInProgressListeners are informed as expected. |
| */ |
| public class TestJobInProgressListener extends TestCase { |
| private static final Log LOG = |
| LogFactory.getLog(TestJobInProgressListener.class); |
| private static String TEST_ROOT_DIR = new File(System.getProperty( |
| "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); |
| private final Path testDir = |
| new Path(TEST_ROOT_DIR, "test-jip-listener-update"); |
| private static MiniMRCluster mr; |
| private static JobTracker jobtracker; |
| private static JobConf conf; |
| private static MyScheduler myScheduler; |
| |
| public static Test suite() { |
| TestSetup setup = |
| new TestSetup(new TestSuite(TestJobInProgressListener.class)) { |
| @Override |
| protected void setUp() throws Exception { |
| conf = new JobConf(); |
| conf.setClass(JTConfig.JT_TASK_SCHEDULER, MyScheduler.class, |
| TaskScheduler.class); |
| mr = new MiniMRCluster(1, "file:///", 1, null, null, conf); |
| jobtracker = mr.getJobTrackerRunner().getJobTracker(); |
| myScheduler = (MyScheduler)jobtracker.getScheduler(); |
| conf = mr.createJobConf(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| conf = null; |
| try { |
| mr.shutdown(); |
| } catch (Exception e) { |
| LOG.info("Error in shutting down the MR cluster", e); |
| } |
| jobtracker = null; |
| myScheduler.terminate(); |
| } |
| }; |
| return setup; |
| } |
| |
| /** |
| * This test case tests if external updates to JIP do not result into |
| * undesirable effects |
| * Test is as follows |
| * - submit 2 jobs of normal priority. job1 is a waiting job which waits and |
| * blocks the cluster |
| * - change one parameter of job2 such that the job bumps up in the queue |
| * - check if the queue looks ok |
| * |
| */ |
| public void testJobQueueChanges() throws IOException { |
| LOG.info("Testing job queue changes"); |
| |
| // stop the job initializer |
| myScheduler.stopInitializer(); |
| |
| JobQueueJobInProgressListener myListener = |
| new JobQueueJobInProgressListener(); |
| |
| // add the listener |
| jobtracker.addJobInProgressListener(myListener); |
| |
| Path inDir = new Path(testDir, "input"); |
| Path outputDir1 = new Path(testDir, "output1"); |
| Path outputDir2 = new Path(testDir, "output2"); |
| |
| RunningJob rJob1 = |
| UtilsForTests.runJob(conf, inDir, outputDir1, 1, 0); |
| LOG.info("Running job " + rJob1.getID().toString()); |
| |
| RunningJob rJob2 = |
| UtilsForTests.runJob(conf, inDir, outputDir2, 1, 0); |
| LOG.info("Running job " + rJob2.getID().toString()); |
| |
| // I. Check job-priority change |
| LOG.info("Testing job priority changes"); |
| |
| // bump up job2's priority |
| LOG.info("Increasing job2's priority to HIGH"); |
| rJob2.setJobPriority("HIGH"); |
| |
| // check if the queue is sane |
| assertTrue("Priority change garbles the queue", |
| myListener.getJobQueue().size() == 2); |
| |
| JobInProgress[] queue = |
| myListener.getJobQueue().toArray(new JobInProgress[0]); |
| |
| // check if the bump has happened |
| assertTrue("Priority change failed to bump up job2 in the queue", |
| queue[0].getJobID().equals(rJob2.getID())); |
| |
| assertTrue("Priority change failed to bump down job1 in the queue", |
| queue[1].getJobID().equals(rJob1.getID())); |
| |
| assertEquals("Priority change has garbled the queue", |
| 2, queue.length); |
| |
| // II. Check start-time change |
| LOG.info("Testing job start-time changes"); |
| |
| // reset the priority which will make the order as |
| // - job1 |
| // - job2 |
| // this will help in bumping job2 on start-time change |
| LOG.info("Increasing job2's priority to NORMAL"); |
| rJob2.setJobPriority("NORMAL"); |
| |
| // create the change event |
| JobInProgress jip2 = jobtracker.getJob(rJob2.getID()); |
| JobInProgress jip1 = jobtracker.getJob(rJob1.getID()); |
| |
| JobStatus prevStatus = (JobStatus)jip2.getStatus().clone(); |
| |
| // change job2's start-time and the status |
| jip2.startTime = jip1.startTime - 1; |
| jip2.status.setStartTime(jip2.startTime); |
| |
| |
| JobStatus newStatus = (JobStatus)jip2.getStatus().clone(); |
| |
| // inform the listener |
| LOG.info("Updating the listener about job2's start-time change"); |
| JobStatusChangeEvent event = |
| new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED, |
| prevStatus, newStatus); |
| myListener.jobUpdated(event); |
| |
| // check if the queue is sane |
| assertTrue("Start time change garbles the queue", |
| myListener.getJobQueue().size() == 2); |
| |
| queue = myListener.getJobQueue().toArray(new JobInProgress[0]); |
| |
| // check if the bump has happened |
| assertTrue("Start time change failed to bump up job2 in the queue", |
| queue[0].getJobID().equals(rJob2.getID())); |
| |
| assertTrue("Start time change failed to bump down job1 in the queue", |
| queue[1].getJobID().equals(rJob1.getID())); |
| |
| assertEquals("Start time change has garbled the queue", |
| 2, queue.length); |
| } |
| |
| /** |
| * Check the queue status upon |
| * - failed job |
| * - killed job |
| * - successful job |
| */ |
| public void testJobCompletion() throws Exception { |
| MyListener mainListener = new MyListener(); |
| jobtracker.addJobInProgressListener(mainListener); |
| |
| // stop the job initializer |
| myScheduler.stopInitializer(); |
| |
| // check queued jobs |
| testQueuedJobKill(conf, mainListener); |
| |
| myScheduler.startInitializer(); |
| |
| // check the queue state for job states |
| testFailedJob(conf, mainListener); |
| |
| testKilledJob(conf, mainListener); |
| |
| testSuccessfulJob(conf, mainListener); |
| } |
| |
| // A listener that inits the tasks one at a time and also listens to the |
| // events |
| public static class MyListener extends JobInProgressListener { |
| private List<JobInProgress> wjobs = new ArrayList<JobInProgress>(); |
| private List<JobInProgress> rjobs = new ArrayList<JobInProgress>(); |
| // list of job added to the wait queue |
| private List<JobID> wjobsAdded = new ArrayList<JobID>(); |
| // list of job added to the running queue |
| private List<JobID> rjobsAdded = new ArrayList<JobID>(); |
| |
| public boolean contains (JobID id) { |
| return contains(id, true) || contains(id, false); |
| } |
| |
| public boolean contains (JobID id, boolean waiting) { |
| if (!wjobsAdded.contains(id)) { |
| throw new RuntimeException("Job " + id + " not seen in waiting queue"); |
| } |
| if (!waiting) { |
| if (!rjobsAdded.contains(id)) { |
| throw new RuntimeException("Job " + id + " not seen in run queue"); |
| } |
| } |
| List<JobInProgress> queue = waiting ? wjobs : rjobs; |
| for (JobInProgress job : queue) { |
| if (job.getJobID().equals(id)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| public void jobAdded(JobInProgress job) { |
| LOG.info("Job " + job.getJobID().toString() + " added"); |
| wjobs.add(job); |
| wjobsAdded.add(job.getJobID()); |
| } |
| |
| public void jobRemoved(JobInProgress job) { |
| LOG.info("Job " + job.getJobID().toString() + " removed"); |
| wjobs.remove(job); |
| rjobs.remove(job); |
| } |
| |
| public void jobUpdated(JobChangeEvent event) { |
| LOG.info("Job " + event.getJobInProgress().getJobID().toString() + " updated"); |
| // remove the job is the event is for a completed job |
| if (event instanceof JobStatusChangeEvent) { |
| JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event; |
| if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) { |
| // check if the state changes from |
| // RUNNING->COMPLETE(SUCCESS/KILLED/FAILED) |
| JobInProgress jip = event.getJobInProgress(); |
| String jobId = jip.getJobID().toString(); |
| if (jip.isComplete()) { |
| LOG.info("Job " + jobId + " deleted from the running queue"); |
| if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) { |
| wjobs.remove(jip); |
| } else { |
| rjobs.remove(jip); |
| } |
| } else { |
| // PREP->RUNNING |
| LOG.info("Job " + jobId + " deleted from the waiting queue"); |
| wjobs.remove(jip); |
| rjobs.add(jip); |
| rjobsAdded.add(jip.getJobID()); |
| } |
| } |
| } |
| } |
| } |
| |
| private void testFailedJob(JobConf job, MyListener myListener) |
| throws IOException { |
| LOG.info("Testing job-fail"); |
| |
| Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input"); |
| Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output"); |
| |
| job.setNumMapTasks(1); |
| job.setNumReduceTasks(0); |
| job.setMaxMapAttempts(1); |
| |
| // submit a job that fails |
| RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir); |
| JobID id = rJob.getID(); |
| |
| // check if the job failure was notified |
| assertFalse("Missing event notification on failing a running job", |
| myListener.contains(id)); |
| |
| // check if failed |
| assertEquals("Job failed!", JobStatus.FAILED, rJob.getJobState()); |
| } |
| |
| private void testKilledJob(JobConf job, MyListener myListener) |
| throws IOException { |
| LOG.info("Testing job-kill"); |
| |
| Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input"); |
| Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output"); |
| |
| job.setNumMapTasks(1); |
| job.setNumReduceTasks(0); |
| |
| // submit and kill the job |
| RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir); |
| JobID id = rJob.getID(); |
| |
| // check if the job failure was notified |
| assertFalse("Missing event notification on killing a running job", |
| myListener.contains(id)); |
| |
| // check if killed |
| assertEquals("Job failed!", JobStatus.KILLED, rJob.getJobState()); |
| } |
| |
| private void testSuccessfulJob(JobConf job, MyListener myListener) |
| throws Exception { |
| LOG.info("Testing job-success"); |
| |
| Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input"); |
| Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output"); |
| |
| job.setNumMapTasks(1); |
| job.setNumReduceTasks(0); |
| |
| // submit the job |
| RunningJob rJob = UtilsForTests.runJobSucceed(job, inDir, outDir); |
| |
| // wait for the job to be successful |
| rJob.waitForCompletion(); |
| |
| // check if the job success was notified |
| assertFalse("Missing event notification for a successful job", |
| myListener.contains(rJob.getID())); |
| |
| // check if successful |
| assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState()); |
| |
| // test if 0-task jobs with setup-cleanup works fine |
| LOG.info("Testing job with no task job with setup and cleanup"); |
| |
| job.setNumMapTasks(0); |
| job.setNumReduceTasks(0); |
| |
| outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks"); |
| |
| // submit the job |
| rJob = UtilsForTests.runJobSucceed(job, inDir, outDir); |
| |
| // wait for the job to be successful |
| rJob.waitForCompletion(); |
| |
| // check if the job success was notified |
| assertFalse("Missing event notification for a successful job with no tasks", |
| myListener.contains(rJob.getID(), true)); |
| |
| // check if successful |
| assertEquals("Job failed!", JobStatus.SUCCEEDED, rJob.getJobState()); |
| |
| // test if jobs with no tasks (0 maps, 0 red) update the listener properly |
| LOG.info("Testing job with no-set-cleanup no task"); |
| |
| outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output-no-tasks-no-set"); |
| |
| Job j = MapReduceTestUtil.createJob(mr.createJobConf(), inDir, outDir, 0, 0); |
| j.setJobSetupCleanupNeeded(false); |
| j.setOutputFormatClass(TestNoJobSetupCleanup.MyOutputFormat.class); |
| j.submit(); |
| j.waitForCompletion(true); |
| |
| JobID id = (org.apache.hadoop.mapred.JobID)j.getID(); |
| |
| // check if the job is in the waiting queue |
| assertFalse("Missing event notification on no-set-cleanup no task job", |
| myListener.contains(id, true)); |
| |
| // check if the job is successful |
| assertEquals("Job status doesnt reflect success", |
| JobStatus.SUCCEEDED, rJob.getJobState()); |
| } |
| |
| /** |
| * This scheduler never schedules any task as it doesnt init any task. So all |
| * the jobs are queued forever. |
| */ |
| public static class MyScheduler extends JobQueueTaskScheduler { |
| |
| @Override |
| public synchronized void start() throws IOException { |
| super.start(); |
| } |
| |
| void stopInitializer() throws IOException { |
| // Remove the eager task initializer |
| taskTrackerManager.removeJobInProgressListener( |
| eagerTaskInitializationListener); |
| // terminate it |
| eagerTaskInitializationListener.terminate(); |
| } |
| |
| void startInitializer() throws IOException { |
| eagerTaskInitializationListener = |
| new EagerTaskInitializationListener(getConf()); |
| eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); |
| // start it |
| eagerTaskInitializationListener.start(); |
| // add the eager task initializer |
| taskTrackerManager.addJobInProgressListener( |
| eagerTaskInitializationListener); |
| } |
| } |
| |
| private void testQueuedJobKill(JobConf conf, MyListener myListener) |
| throws IOException { |
| LOG.info("Testing queued-job-kill"); |
| |
| Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerqueuedjob/input"); |
| Path outDir = new Path(TEST_ROOT_DIR + "/jiplistener1ueuedjob/output"); |
| |
| conf.setMapperClass(IdentityMapper.class); |
| conf.setReducerClass(IdentityReducer.class); |
| conf.setNumMapTasks(1); |
| conf.setNumReduceTasks(0); |
| RunningJob rJob = UtilsForTests.runJob(conf, inDir, outDir); |
| JobID id = rJob.getID(); |
| LOG.info("Job : " + id.toString() + " submitted"); |
| |
| // check if the job is in the waiting queue |
| assertTrue("Missing event notification on submiting a job", |
| myListener.contains(id, true)); |
| |
| // kill the job |
| LOG.info("Killing job : " + id.toString()); |
| rJob.killJob(); |
| |
| // check if the job is killed |
| assertEquals("Job status doesnt reflect the kill-job action", |
| JobStatus.KILLED, rJob.getJobState()); |
| |
| // check if the job is correctly moved |
| // from the waiting list |
| assertFalse("Missing event notification on killing a waiting job", |
| myListener.contains(id, true)); |
| } |
| } |